b8f563199abc7cc500778b9edb740c155b4bc484,watchdog/src/main/java/com/continuuity/metrics/query/BatchMetricsHandler.java,BatchMetricsHandler,computeQueueLength,#MetricsRequest#,227
Before Change
// First scan the ack to get an aggregate and also names of queues.
AggregatesScanner scanner = aggregatesTable.scan(metricsRequest.getContextPrefix(),
"q.ack",
metricsRequest.getRunId(),
metricsRequest.getTagPrefix());
long ack = 0;
Set<QueueName> queueNames = Sets.newHashSet();
while (scanner.hasNext()) {
AggregatesScanResult scanResult = scanner.next();
ack += scanResult.getValue();
queueNames.add(QueueName.from(URI.create(scanResult.getMetric().substring("q.ack.".length()))));
}
After Change
// and subtract the two to get queue length.
AggregatesScanner scanner = aggregatesTable.scan(metricsRequest.getContextPrefix(),
"process.events.processed",
metricsRequest.getRunId(),
"input");
long processed = 0;
Set<QueueName> queueNames = Sets.newHashSet();
while (scanner.hasNext()) {
AggregatesScanResult scanResult = scanner.next();
processed += scanResult.getValue();
// tag is of the form input.[queueURI]. ex: input.queue://PurchaseFlow/reader/queue
String tag = scanResult.getTag();